package elephant.core;

import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import elephant.Lifecycle;
import elephant.util.ObjectPrinter;

/**
 * 
 * @author icecooly
 *
 */
public class ThreadPool implements Lifecycle{
	//
	private static Logger logger=LoggerFactory.getLogger(ThreadPool.class);
	//
	private static final int DEFAULT_CORE_POOL_SIZE=32;
	private static final int DEFAULT_MAX_POOL_SIZE=64;
	//
	private static final String WORKER_THREAD_POOL_NAME="WorkerThread";
	private static final String SCHEDULED_THREAD_POOL_NAME="ScheduledThread";
	//
	private ThreadPoolExecutor workerThreadPool;

	private ScheduledExecutorService scheduledThreadPool;
	
	private LinkedBlockingQueue<Runnable> requestQueue;
	//
	private int workerPoolSize;
	private int maxWorkerPoolSize;
	private int linkedBlockingQueueSize;
	private int scheduledPoolSize;
	//
	public ThreadPool() {
		workerPoolSize=DEFAULT_CORE_POOL_SIZE;
		maxWorkerPoolSize=DEFAULT_MAX_POOL_SIZE;
		linkedBlockingQueueSize=workerPoolSize*2;
		scheduledPoolSize=5;
	}
	//
	@Override
	public void init() {
		if(logger.isInfoEnabled()){
			logger.info(getClass().getSimpleName()+" init");
		}
	}
	//
	@Override
	public void start() {
		if(logger.isInfoEnabled()){
			logger.info(getClass().getSimpleName()+" start");
		}
		requestQueue=new LinkedBlockingQueue<Runnable>(linkedBlockingQueueSize);
		workerThreadPool=new ThreadPoolExecutor(
				workerPoolSize, maxWorkerPoolSize,
				60L,
				TimeUnit.SECONDS, 
				requestQueue,
				new ContainerThreadFactory(WORKER_THREAD_POOL_NAME));
		workerThreadPool.setRejectedExecutionHandler(
				new ThreadPoolExecutor.CallerRunsPolicy());
		//
		scheduledThreadPool=new ScheduledThreadPoolExecutor(
				scheduledPoolSize,
				new ContainerThreadFactory("ScheduledWorker"),
				new ThreadPoolExecutor.AbortPolicy());
	}

	@Override
	public void stop() {
		if(logger.isInfoEnabled()){
			logger.info(getClass().getSimpleName()+" stop");
		}
		if(workerThreadPool!=null){
			workerThreadPool.shutdown();
		}
	}
	//
	public void execute(Runnable worker){
		workerThreadPool.execute(new WorkerRunnable(worker));
	}
	//
	public ScheduledFuture<?> scheduleAtFixedRate(Runnable command,
			long initialDelay, long period, TimeUnit unit) {
		return scheduledThreadPool.scheduleAtFixedRate(command,
				initialDelay, period, unit);
	}
	//
	public void setWorkerPoolSize(int workerPoolSize) {
		this.workerPoolSize = workerPoolSize;
	}
	//
	public void setMaxWorkerPoolSize(int maxWorkerPoolSize) {
		this.maxWorkerPoolSize = maxWorkerPoolSize;
	}
	//
	public void setScheduledPoolSize(int scheduledPoolSize) {
		this.scheduledPoolSize = scheduledPoolSize;
	}
	//
	@Override
	public String dump() {
		ObjectPrinter op=new ObjectPrinter();
		op.section(getClass().getSimpleName());
		op.print("workerThreadName",WORKER_THREAD_POOL_NAME);
		op.print("workerPoolSize",workerPoolSize);
		op.print("maxWorkerPoolSize",maxWorkerPoolSize);
		op.print("BlockingQueueSize",linkedBlockingQueueSize);
		op.print("scheduledThreadName",SCHEDULED_THREAD_POOL_NAME);
		op.print("scheduledPoolSize",scheduledPoolSize);
		return op.toString();
	}
}
